/**
 * Copyright 2014 Open Connectome Project (http://openconnecto.me)
 * Written by Da Zheng (zhengda1936@gmail.com)
 *
 * This file is part of SAFSlib.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include <string.h>

#include "comp_io_scheduler.h"

namespace safs
{

static const int COMPLETE_QUEUE_SIZE = 10240;

comp_io_scheduler::comp_io_scheduler(int node_id): incomplete_computes(node_id,
		COMPLETE_QUEUE_SIZE, true)
{
	io = NULL;
}

/*
 * The post-computation steps include:
 *	add the user task to a queue if it needs to issue more requests;
 *	deallocate the user computation if it's no longer being used.
 */
void comp_io_scheduler::post_comp_process(user_compute *compute)
{
	// If we can't fetch all requests generated by the computation,
	// the computation has completed, we need to keep it and it will run
	// when the required data is available.
	bool has_requests = compute->has_requests();
	bool has_completed = compute->has_completed();
	if (has_requests || !has_completed) {
		if (!compute->test_flag(user_compute::IN_QUEUE)) {
			compute->inc_ref();
			incomplete_computes.push_back(compute);
			compute->set_flag(user_compute::IN_QUEUE, true);
		}
	}

	// This is to decrease the count for the reference when the user compute
	// is attached to an I/O request.
	compute->dec_ref();
	// If no one else is referencing the user compute, it means it's not
	// in the queue. We can delete it now.
	if (compute->get_ref() == 0) {
		assert(!compute->test_flag(user_compute::IN_QUEUE));
		compute_allocator *alloc = compute->get_allocator();
		alloc->free(compute);
	}
}

void comp_io_scheduler::gc_computes()
{
	int size = incomplete_computes.get_num_entries();
	for (int i = 0; i < size; i++) {
		user_compute *compute = incomplete_computes.front();
		incomplete_computes.pop_front();
		if (compute->has_completed()) {
			delete_compute(compute);
		}
		else {
			// The computation hasn't been completed, we need to add it
			// back to the queue for further processing.
			incomplete_computes.push_back(compute);
		}
	}
}

size_t default_comp_io_scheduler::get_requests(
		fifo_queue<io_request> &requests, size_t max)
{
	// If the request queue is already full, don't do anything.
	if (requests.is_full() || max == 0)
		return 0;

	size_t num = 0;
	bool from_begin;
	do {
		compute_iterator end = this->get_end();
		if (curr_it == end)
			curr_it = this->get_begin();
		from_begin = (curr_it == this->get_begin());
		for (; curr_it != end; ++curr_it) {
			user_compute *compute = *curr_it;
			int ret = compute->fetch_requests(get_io(), requests,
					min(max - num, requests.get_num_remaining()));
			num += ret;
			has_completed |= compute->has_completed();
			// We should limit the number of pending requests generated by
			// user compute.
			if (requests.is_full() || num == max) {
				assert(ret > 0);
				break;
			}
			// If there were some requests left in a user compute, we must have
			// got enough requests. Then we would reach here.
			assert(!compute->has_requests());
		}
		// If we haven't got enough requests and we didn't iterate the queue
		// from the beginning, we should try again.
	} while (!from_begin && !requests.is_full() && num < max);
	assert(num <= max);
	return num;
}

void default_comp_io_scheduler::gc_computes()
{
	// We only garbage collect user computes when there are completed ones.
	if (has_completed) {
		comp_io_scheduler::gc_computes();
		has_completed = false;
		// The garbage collection reorders the elements in the user compute queue.
		// The original iterator should be reset.
		curr_it = this->get_end();
	}
}

}
